package flinkstudy.batch;

import flinkstudy.bo.HouseInfo;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
 * dataset 操作
 *
 * @author daocr
 * @date 2019/12/16
 */
public class Transformations {

    ExecutionEnvironment env = null;

    @Before
    public void init() {
        env = ExecutionEnvironment.getExecutionEnvironment();
    }

    /**
     * 数据类型转换
     *
     * @throws Exception
     */

    @Test
    public void map() throws Exception {
        DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);
        MapOperator<Integer, String> map = dataSource.map(e -> {
            return "第" + e + "个";
        });
        map.print();
    }

    /**
     * 嵌套集合合并
     */
    @Test
    public void flatMap() throws Exception {

        DataSource<List<List<Integer>>> listDataSource = env.fromElements(Arrays.asList(Arrays.asList(1, 2, 3, 4, 5), Arrays.asList(4, 5, 6, 7, 8)));
        listDataSource.flatMap((FlatMapFunction<List<List<Integer>>, List<Integer>>) (value, out) -> {

            ArrayList<Integer> merge = new ArrayList<>();

            for (List<Integer> integers : value) {
                merge.addAll(integers);
            }

            out.collect(merge);
        }).print();

    }

    /**
     * 根据 分区 大小 map
     */
    @Test
    public void mapPartition() throws Exception {


        DataSource<Integer> dataSource = env.fromCollection(getIntegers(100));

        dataSource.setParallelism(10).mapPartition(new MapPartitionFunction<Integer, String>() {
            @Override
            public void mapPartition(Iterable<Integer> values, Collector<String> out) throws Exception {

                System.out.println("进入 mapPartition ");

                List<Integer> collect = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList());

                collect.stream().map(e -> "第" + e + "个").forEach(e -> out.collect(e));

            }
        }).print();
    }


    /**
     * 过滤数据
     *
     * @throws Exception
     */

    @Test
    public void filter() throws Exception {
        DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);
        dataSource.filter(e -> e >= 3).print();
    }


    /**
     * 将两个元素合成成一个元素
     *
     * @throws Exception
     */
    @Test
    public void reduce() throws Exception {
        DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);

        dataSource.reduce(new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer value1, Integer value2) throws Exception {

                return value1 + value2;
            }
        }).print();

    }

    /**
     * 返回多个 reduce 结果
     *
     * @throws Exception
     */

    @Test
    public void ReduceGroup() throws Exception {
        DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);

        dataSource.reduceGroup(new GroupReduceFunction<Integer, Integer>() {
            @Override
            public void reduce(Iterable<Integer> values, Collector<Integer> out) throws Exception {

                int prefixSum = 0;
                for (Integer i : values) {
                    prefixSum += i;
                    out.collect(prefixSum);
                }
            }
        }).print();
    }

    /**
     *
     * @throws Exception
     */
    @Test
    public void combineGroup() throws Exception {


        List<Integer> integerList = getIntegers(10000);

        DataSource<Integer> dataSource = env.fromCollection(integerList);
        dataSource.setParallelism(1);
        UnsortedGrouping<Integer> integerUnsortedGrouping = dataSource.groupBy("*");

        integerUnsortedGrouping.combineGroup(new GroupCombineFunction<Integer, List<Integer>>() {
            @Override
            public void combine(Iterable<Integer> values, Collector<List<Integer>> out) throws Exception {
                List<Integer> collect = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList());
                System.out.println("input :" + collect);
                out.collect(collect);
            }
        }).print();
    }

    private List<Integer> getIntegers(Integer cut) {


        List<Integer> integerList = new ArrayList<>();
        ThreadLocalRandom current = ThreadLocalRandom.current();
        for (int i1 = 0; i1 < cut; i1++) {
            integerList.add(current.nextInt(1, 10));
        }
        return integerList;
    }


    /**
     * 简单的聚合
     *
     * @throws Exception
     */
    @Test
    public void aggregate() throws Exception {
        DataSource<Tuple1> dataSource = env.fromElements(new Tuple1(1), new Tuple1(2), new Tuple1(3), new Tuple1(4));
        dataSource.aggregate(Aggregations.SUM, 0).print();
    }

    /**
     * 去重
     *
     * @throws Exception
     */
    @Test
    public void distinct() throws Exception {
        DataSource<Integer> dataSource = env.fromElements(1, 2, 2, 2, 3, 4);

        dataSource.distinct().print();

    }


    /**
     * 内连接查询
     * 只有匹配上的才显示
     *
     * @throws Exception
     */
    @Test
    public void join() throws Exception {

        DataSource<Tuple2> dataSource1 = env.fromElements(new Tuple2(1, "张三"), new Tuple2(2, "李四"), new Tuple2(3, "王五"), new Tuple2(4, "赵六"));

        DataSource<Tuple2> dataSource2 = env.fromElements(new Tuple2(1, 10), new Tuple2(2, 90), new Tuple2(3, 88), new Tuple2(4, 33));

        dataSource1.join(dataSource2).where(0).equalTo(0).print();
    }

    /**
     * 外连接查询
     *
     * @throws Exception
     * @see DataSource#leftOuterJoin(DataSet)
     * @see DataSource#rightOuterJoin(DataSet)
     * @see DataSource#fullOuterJoin(DataSet)
     */
    @Test
    public void outerJoin() throws Exception {

        DataSource<Tuple2> dataSource1 = env.fromElements(new Tuple2(1, "张三"), new Tuple2(2, "李四"), new Tuple2(3, "王五"), new Tuple2(4, "赵六"));


        DataSource<Tuple2> dataSource2 = env.fromElements(new Tuple2(1, 10), new Tuple2(2, 90), new Tuple2(3, 88), new Tuple2(3, 99));

        // 设置返回类型
        TypeInformation<Tuple2<String, Integer>> of = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
        });

        dataSource1.leftOuterJoin(dataSource2).where(0).equalTo(0)
                .with(new FlatJoinFunction<Tuple2, Tuple2, Tuple2<String, Integer>>() {
                    @Override
                    public void join(Tuple2 first, Tuple2 second, Collector<Tuple2<String, Integer>> out) throws Exception {
                        Integer num = (Integer) Optional.ofNullable(second).map(e -> e.f1).orElse(0);
                        out.collect(Tuple2.of(first.f1.toString(), num));
                    }
                }).returns(of).print();

    }


    /**
     * 处理 多对多 关系
     *
     * @throws Exception
     */
    @Test
    public void coGroup() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"), new Tuple2(2, "李四"), new Tuple2(3, "王五"), new Tuple2(4, "赵六"));

        DataSource<Tuple2<Integer, Integer>> dataSource2 = env.fromElements(new Tuple2(1, 10), new Tuple2(2, 90), new Tuple2(3, 88), new Tuple2(3, 99));


        dataSource1.coGroup(dataSource2).where(0).equalTo(0).with(new CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, Integer>, Tuple2<List<Tuple2<Integer, String>>, List<Tuple2<Integer, Integer>>>>() {
            @Override
            public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable<Tuple2<Integer, Integer>> second, Collector<Tuple2<List<Tuple2<Integer, String>>, List<Tuple2<Integer, Integer>>>> out) throws Exception {

                List<Tuple2<Integer, String>> collect = StreamSupport.stream(first.spliterator(), false).collect(Collectors.toList());

                List<Tuple2<Integer, Integer>> collect1 = StreamSupport.stream(second.spliterator(), false).collect(Collectors.toList());

                out.collect(Tuple2.of(collect, collect1));
            }
        }).print();
    }


    /**
     * 生成 笛卡尔积(叉乘) 数据源
     *
     * @throws Exception
     */
    @Test
    public void cross() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"), new Tuple2(2, "李四"), new Tuple2(3, "王五"), new Tuple2(4, "赵六"));

        DataSource<Tuple2<Integer, Integer>> dataSource2 = env.fromElements(new Tuple2(1, 10), new Tuple2(2, 90), new Tuple2(3, 88), new Tuple2(3, 99));

        CrossOperator.DefaultCross<Tuple2<Integer, String>, Tuple2<Integer, Integer>> cross = dataSource1.cross(dataSource2);

        cross.print();

    }

    /**
     * 把两个数据集进行拼接
     *
     * @throws Exception
     */
    @Test
    public void union() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"), new Tuple2(2, "李四"), new Tuple2(3, "王五"), new Tuple2(4, "赵六"));

        DataSource<Tuple2<Integer, String>> dataSource2 = env.fromElements(new Tuple2(5, "周八"), new Tuple2(6, "吴九"));

        dataSource1.union(dataSource2).print();
    }


    /**
     * 数据分组
     *
     * @throws Exception
     */
    @Test
    public void groupBy() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"), new Tuple2(2, "李四"), new Tuple2(3, "李四"), new Tuple2(4, "王五"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"), new Tuple2(7, "赵六"));

        dataSource1.groupBy(1).reduceGroup(new GroupReduceFunction<Tuple2<Integer, String>, List<Tuple2<Integer, String>>>() {
            @Override
            public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<List<Tuple2<Integer, String>>> out) throws Exception {
                List<Tuple2<Integer, String>> collect = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList());
                out.collect(collect);
            }
        }).print();

    }

    /**
     * 自定义分组
     *
     * @throws Exception
     */
    @Test
    public void customizeGroupBy() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"), new Tuple2(2, "李四"), new Tuple2(3, "李四"), new Tuple2(4, "王五"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"), new Tuple2(7, "赵六"));

        UnsortedGrouping<Tuple2<Integer, String>> tuple2UnsortedGrouping = dataSource1.groupBy(new KeySelector<Tuple2<Integer, String>, String>() {
            @Override
            public String getKey(Tuple2<Integer, String> value) throws Exception {

                if (value.f0 < 3) {
                    return "小于3";
                }
                return "大于3";
            }
        });


        GroupReduceOperator<Tuple2<Integer, String>, List<Tuple2<Integer, String>>> tuple2ListGroupReduceOperator = tuple2UnsortedGrouping.reduceGroup(new GroupReduceFunction<Tuple2<Integer, String>, List<Tuple2<Integer, String>>>() {
            @Override
            public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<List<Tuple2<Integer, String>>> out) throws Exception {
                List<Tuple2<Integer, String>> collect = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList());
                out.collect(collect);
            }
        });

    }

    /**
     * 分组加排序
     *
     * @throws Exception 获取每个分组里面的元素数量
     * @see SortedGrouping#first(int) limit
     * @see Order
     */
    @Test
    public void sortGroup() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"),
                new Tuple2(2, "李四"), new Tuple2(3, "李四")
                , new Tuple2(4, "赵六"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"));

        dataSource1.groupBy(1).sortGroup(0, Order.ASCENDING).first(2).print();

    }


    /**
     * 类似于sql 中，select 【列】 选择需要的列
     *
     * @throws Exception
     */
    @Test
    public void Project() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"),
                new Tuple2(2, "李四"), new Tuple2(3, "李四")
                , new Tuple2(4, "赵六"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"));

        dataSource1.project(1).print();

    }

    /**
     * 获取最大值 最小值
     *
     * @throws Exception
     */
    @Test
    public void minByMaxBy() throws Exception {


        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"),
                new Tuple2(2, "李四"), new Tuple2(3, "李四")
                , new Tuple2(4, "赵六"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"));

        dataSource1.maxBy(0).print();
        dataSource1.minBy(0).print();

    }


    /**
     * 数据 排序
     *
     * @throws Exception
     */
    @Test
    public void sort() throws Exception {
        DataSource<Tuple2<Integer, String>> dataSource1 = env.fromElements(new Tuple2(1, "张三"),
                new Tuple2(2, "李四"), new Tuple2(3, "李四")
                , new Tuple2(4, "赵六"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"));


        dataSource1.sortPartition(new KeySelector<Tuple2<Integer, String>, String>() {
            @Override
            public String getKey(Tuple2<Integer, String> value) throws Exception {
                return value.f0.toString();
            }
        }, Order.DESCENDING)
                .print();

    }


    @Test
    public void avg() throws Exception {


    }


    @Test
    public void tpl() throws Exception {


    }


}
